package org.budo.warehouse.logic.consumer;

import java.util.List;

import javax.annotation.Resource;

import org.budo.support.dao.page.Page;
import org.budo.support.lang.util.ListUtil;
import org.budo.warehouse.logic.api.DataConsumer;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.api.IEventFilterLogic;
import org.budo.warehouse.logic.bean.LogicDynamicBeanProvider;
import org.budo.warehouse.logic.util.DataMessageLogicUtil;
import org.budo.warehouse.service.api.IPipelineService;
import org.budo.warehouse.service.entity.Pipeline;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * @author limingwei
 */
@Slf4j
@Component("dispatcherDataConsumer")
public class DispatcherDataConsumer implements DataConsumer {
    @Resource
    private IPipelineService pipelineService;

    @Resource
    private LogicDynamicBeanProvider logicDynamicBeanProvider;

    @Resource
    private IEventFilterLogic eventFilterLogic;

    @Override
    public void consume(DataMessage dataMessage) {
        Integer dataNodeId = dataMessage.getDataNodeId();
        List<Pipeline> pipelines = pipelineService.listBySourceDataNodeCached(dataNodeId, Page.max());

        if (null == pipelines || pipelines.isEmpty()) {
            log.error("#40 pipelines=" + pipelines + ", dataMessage=" + DataMessageLogicUtil.toSimpleString(dataMessage));
            return;
        }

        for (Pipeline pipeline : pipelines) {
            DataMessage filteredMessage = eventFilterLogic.filter(pipeline, dataMessage);
            if (null == filteredMessage || ListUtil.isNullOrEmpty(filteredMessage.getDataEntries())) {
                continue; // 事件过滤
            }

            this.consume(pipeline, filteredMessage);
        }
    }

    private void consume(Pipeline pipeline, DataMessage dataMessage) {
        DataConsumer dataConsumer = logicDynamicBeanProvider.dataConsumer(pipeline);

        dataConsumer.consume(dataMessage);
    }
}